<!doctype html>
<html lang="zh" class="h-100">
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">

    <link rel="stylesheet" href="assets/css/bootstrap.min.css">
    <link rel="stylesheet" href="assets/css/style.css">

    <title>集群架构：分布式进程通讯</title>
  </head>
  <body class="h-100">
  <div class="container-fluid h-100 p-2">
  	<h1>集群架构：分布式进程通讯</h1>
  	<h3>集群架构:分布式进程通讯</h3><p>如果beyod是以多进程方式运行，不同的客户端会连接到不同的工作进程上，因为这些工作属于不同的地址空间，它们之间无法直接通讯，必须借助一些共享通道（如信号、unix domain socket、消息队列、socket等）才能通讯。</p><p>beyod开发之初，就充分考虑到分布式应用环境，为此内置了订阅发布模型的客户端和服务器端组件， 可以很容易构建跨服务器的分布式应用架构。 </p><p>为了支持分布式消息通讯，同时引入了一些概念，需要预先深入了解：</p><h4>server_id</h4><p>每个beyod server实例的唯一id, 默认为1， 不同的beyod server实例，必须不能相同。取值范围1~65535。应该为server组件配置其server_id:  <br>config/main.php</p><pre><code>'components'=&gt;[
    'server' =&gt; [
        'class' =&gt; 'beyoio\Server',
        'server_id' =&gt; 174,
        //...
    ]
]</code></pre><h4>worker_id</h4><p>工作进程的编号，这是一个只读属性，表示工作进程的顺序号，从1开始递增。<br>==如果一个进程崩溃后被主进程重新生成后，worker_id是不变的==。我们可以根据worker_id实现进程区别和标识。  </p><p>运行时可以通过以下方式取得当前工作进程的workder_id:</p><pre><code>$worker_id = \Yii::$app-&gt;server-&gt;getWorkerId();</code></pre><h4>GPID</h4><p>全局进程唯一标识(Global process unique identification)</p><p>即工作进程在整个集群中的全局唯一标识，算法为：</p><pre><code>$GPID = ($server_id &lt;&lt; 16) + $worker_id;</code></pre><p>可以使用以下方式取得当前进程在集群中的唯一标识：</p><pre><code>$gpid = \Yii::$app-&gt;server-&gt;getGPID();</code></pre><p>典型的，在beyod的分布式消息推送架构中，GPID作为频道名称使用。</p><h4>dispatcher</h4><p>分发器，beyod内建了分布式消息推送服务器组件，它的作用：</p><ol><li>接受客户端的订阅请求</li><li>接收客户端的发布请求，根据频道号，将消息推送到相关订阅者。</li><li>支持多种数据结构：</li></ol><p><strong>发布订阅模型</strong>  <br><strong>hash表共享存储</strong>  <br><strong>基于SplMaxHeap的优先队列</strong>  <br><strong>消息广播</strong>  </p><p>dispatcher使用内存存储hash/队列数据，规划前要考虑到内存容量限制的问题。 生产环境建议使用数据库/缓存系统/队列系统来存储数据。dispatcher最适合的就场景就是分发消息而不是存储数据。</p><p>考虑到dipatch server仅仅只是转发消息，并不参与复杂的业务逻辑处理，它本身即便以单进程方式运行，也很难形成瓶颈。如果业务量较大，单个dispatcher server无法支持，那么完全可以部署多个dispatcher，实现负载均衡架构。</p><p><strong>dispatcher server的配置</strong>  <br>config/main.php</p><pre><code>'components'=&gt;[
    'server'=&gt;[
        'class' =&gt; 'beyoio\Server',
        'worker_num'=&gt;1, //以单进程方式运行，保证数据是共享唯一的
        'listeners' =&gt; [
            'class' =&gt; 'beyoio\Listener',
            'listen'=&gt;'tcp://0.0.0.0:2491'
            'handler'=&gt;'beyoio\dispatcher\Handler',
            'parser' =&gt;'beyoio\dispatcher\Parser'
        ],
    ]
]</code></pre><p><strong>客户端</strong>  <br>例如：我们可以在worker启动之后，就订阅频道并准消息的接收处理(假设dispatcher的ip为192.168.0.101)：  <br>appMyServer.php中的内容：</p><pre><code>namespace app;

use beyoio\Server;
use beyoio\dispatcher\Client;

class MyServer extends Server
{
    public $client;
    
    //此回调在工作进程启动后执行
    public function onWorkerStart($workerId, $GPID)
    {
        parent::onWorkerStart($workerId, $GPID);
        $this-&gt;connectDispatcher($GPID);
    }
    
    
    /**
     * 连接订阅服务器
     */
    protected function connectDispatcher()
    {
        $options = [
            'target' =&gt; 'tcp://192.168.0.101:2491',
            'reconnect_interval' =&gt; 6, //连接被对方断开后，6秒后重连。
        ];
        
        $this-&gt;client = new Client($options);
        $this-&gt;client-&gt;default_channel=$this-&gt;getGpid();//连接成功后，即订阅频道，频道号即为GPID
        
        
        $this-&gt;client-&gt;on(Server::ON_MESSAGE, function(MessageEvent $event){
            print_r($event-&gt;message); //收到消息时的回调
        });
        
        //连接到分发服务器。
        $this-&gt;client-&gt;connect();
    }
}
</code></pre><p><strong>beyoiodispatcherClient</strong></p><p>beyoiodispatcherClient分发器客户端连接组件：</p><pre><code>&lt;?php

namespace beyoio\dispatcher;

use beyoio\StreamClient;
use beyoio\Server;
use beyoio\IOEvent;

class Client extends StreamClient
{

    public $reconnect_interval = 3; //连接异常中断后的重连间隔3秒
    
    //连接后自动订阅的频道
    public $default_channel = [];
    
    /**
     * 订阅一个或多个频道
     * @param array|string $keys The channel names to subscribe to
     */
    public function subscribe($keys)
    {
        return $this-&gt;send(['command'=&gt;'subscribe', 'key'=&gt;$keys, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 取消订阅一个或多个频道
     * @param array|string $keys The channel names to cancel
     */
    public function unsubscribe($keys)
    {
        return $this-&gt;send(['command'=&gt;'unsubscribe', 'key'=&gt;$keys, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 向一个或多个频道上推送消息
     * @param string|array $key
     * @param mixed $value
     */
    public function publish($key, $value)
    {
        return $this-&gt;send(['command'=&gt;'publish', 'key'=&gt;$keys, 'value'=&gt;$value, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 向所有频道广播消息
     * @param mixed $value
     */
    public function broadcat($value)
    {
        return $this-&gt;send(['command'=&gt;'broadcat','value'=&gt;$value, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    
    /**
     * 获取已经订阅的频道集合
     */
    public function channels()
    {
        $this-&gt;send(['command' =&gt; 'channels', 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    
    /**
     * 设置一个hash表条目
     * @param string $key
     * @param mixed $value
     * @param int $expire 缓存过期绝对时间戳 0表示永不过期
     */
    public function set($key, $value, $expire=0)
    {
        return $this-&gt;send(['command'=&gt;'set', 'key'=&gt;$key, 'value'=&gt;$value, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 读取一个hash表条目
     * @param string $key
     */
    public function get($key)
    {
        return $this-&gt;send(['command'=&gt;'get', 'key'=&gt;$key, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 删除一个hash表条目
     * @param string $key
     */    
    public function delete($key)
    {
        return $this-&gt;send(['command'=&gt;'delete', 'key'=&gt;$key, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    //删除所有hash条目
    public function deleteAll()
    {
        return $this-&gt;send(['command'=&gt;'deleteall','seq'=&gt;$this-&gt;getSequence()]);
    }
    
    
    /**
     * 向队列中追加记录
     * @param string $key queue name
     * @param mixed $value data
     * @param int $priority 
     * @see http://php.net/manual/en/splpriorityqueue.insert.php
     */
    public function qpush($key, $value)
    {
        return $this-&gt;send(['command'=&gt;'qpush', 'key'=&gt;$key,'value'=&gt;$value, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 从队列中取出记录
     * @param string $key queue name
     * @param string $block whether enable blocking mode. In blocking mode, the server returns only if there is data in the queue
     * @return void|boolean|number
     */
    public function qpop($key, $block=false){
        return $this-&gt;send(['command'=&gt;'qpop', 'key'=&gt;$key, 'block'=&gt;$block, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 清空指定队列
     * @param string $key queue name
     */
    public function qdelete($key)
    {
        return $this-&gt;send(['command'=&gt;'qdelete', 'key'=&gt;$key, 'seq'=&gt;$this-&gt;getSequence()]);
    }
    
    /**
     * 清空所有队列
     * @param string $key queue name
     */
    public function qdeleteAll($key)
    {
        return $this-&gt;send(['command'=&gt;'qdeleteall', 'key'=&gt;$key, 'seq'=&gt;$this-&gt;getSequence()]);
    }
}</code></pre><p><strong>协议格式</strong><br>dispatcher消息使用二进制json格式，由包头和数据部分组成。使用json格式而不是PHP序列化，就很容易兼容多种编程语言。 <br>包头是一个4字节无符号的整数，指明了整个数据包的长度，数据部分即json字符串。以下描述中，省去包头部分，着重关注数据部分：</p><ol><li>订阅请求</li></ol><p>{"command":"subscribe", "key": [], "seq":""}</p><pre><code>key:是一个字符串数组，指定了要订阅的频道    
seq:当前消息的唯一标识，由client自动生成。   </code></pre><p>响应结果： <br>{"command":"subscribe","key":[], "code":0,"message":"ok", "seq":""}</p><ol><li>取消订阅：</li></ol><p>{"command":"unsubscribe", "key": [], "seq":""} <br>响应结果：  <br>{"command":"unsubscribe","key":[], "code":0,"message":"ok", "seq":""}</p><ol><li>发布消息</li></ol><p>{"command":"publish", "key": [],"value": "", "seq":""}    <br>value即为消息内容  <br>响应结果：  <br>{"command":"publish", "key": [],"value": "", "seq":""}</p><ol><li>频道消息结构：</li></ol><p>{"command":"message", "value":"", "source":"消息发送者的ip和端口"}</p>  </div>
  <script src="assets/js/jquery.min.js"></script>
  <script src="assets/js/popper.min.js"></script>
  <script src="assets/js/bootstrap.min.js"></script>

<script>
$(function(){
  var url = self.location;
  parent.$('a').removeClass('font-weight-bold');
  parent.$('a[href="240.html"]').addClass('font-weight-bold');
});
</script>  
  
  </body>
</html>